package sputils

import (
	"fmt"
	"gitee.com/ymofen/gobase/subpub"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

// 发布通道
type subchannelItem struct {
	id            string
	sesslst       map[string]subpub.SubFunc
	idlst         []string
	fnlst         []subpub.SubFunc
	lastActivityT int64
}

// 订阅中心, 订阅和取消订阅的效率比subscribe快很多
// 订阅主题 Sub消息通道不支持通配符
type Subchannel struct {
	channelAliveN int32
	lockcnt       int32

	lk sync.RWMutex

	// 通道列表
	channellst map[string]*subchannelItem
}

var (
	DefaultSubchannel = NewSubchannel()
)

func NewSubchannel() *Subchannel {
	return &Subchannel{
		channellst: make(map[string]*subchannelItem),
	}
}

func (this *Subchannel) Close() error {
	this.lk.Lock()
	defer this.lk.Unlock()
	for _, v := range this.channellst {
		//delete(this.channellst, k)
		this.innerFreeChannel(v)
	}
	return nil
}

func (this *Subchannel) checkGetChannel(channel string, new bool) *subchannelItem {
	itm := this.channellst[channel]
	if new && itm == nil {
		itm = &subchannelItem{id: channel, sesslst: make(map[string]subpub.SubFunc)}
		atomic.AddInt32(&this.channelAliveN, 1)
		runtime.SetFinalizer(itm, func(obj interface{}) {
			atomic.AddInt32(&this.channelAliveN, -1)
		})
		this.channellst[channel] = itm

	}
	return itm
}

func (this *Subchannel) Status() string {
	return fmt.Sprintf("subtopic-n:%d, channel:%d, alive:%d", this.GetChannelCount(), len(this.channellst), this.channelAliveN)
}

func (this *Subchannel) GetChannelSessionCount(channel string) int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	itm := this.checkGetChannel(channel, false)
	if itm == nil {
		return 0
	}
	return len(itm.sesslst)
}

func (this *Subchannel) GetChannelCount() int {
	this.lk.RLock()
	defer this.lk.RUnlock()
	return len(this.channellst)
}

func (this *Subchannel) innerReloadSubSessionFnlst(itm *subchannelItem) {
	fnlst := make([]subpub.SubFunc, len(itm.sesslst))
	idlst := make([]string, len(itm.sesslst))

	i := 0
	for id, fn := range itm.sesslst { // 所有session都添加进去
		fnlst[i] = fn
		idlst[i] = id
		i++
	}

	itm.fnlst, itm.idlst = fnlst, idlst
}

// r:返回通道订阅者数量
func (this *Subchannel) Sub(id, channel string, cb subpub.SubFunc) (r int) {
	this.lk.Lock()
	defer this.lk.Unlock()
	itm := this.checkGetChannel(channel, true)
	itm.sesslst[id] = cb
	this.innerReloadSubSessionFnlst(itm)
	r = len(itm.sesslst)
	return
}

// 假定调用者已经加锁
// -1: 通道不存在
//
//	r:返回通道订阅者数量
func (this *Subchannel) Unsub(id, channel string) (r int, changed bool) {
	this.lk.Lock()
	defer this.lk.Unlock()
	itm := this.checkGetChannel(channel, false)
	if itm != nil {
		delete(itm.sesslst, id)
		r = len(itm.sesslst)
		if r == 0 {
			this.innerFreeChannel(itm)
		} else {
			this.innerReloadSubSessionFnlst(itm)
		}
		return r, r != 0
	}
	return -1, false
}

// 0:发布数据失败
func (this *Subchannel) Pub(channel string, max int, args ...interface{}) int {
	n := 0
	var fnlst []subpub.SubFunc
	var idlst []string
	this.lk.RLock()
	itm := this.channellst[channel]
	if itm != nil {
		fnlst, idlst = itm.fnlst, itm.idlst
		itm.lastActivityT = time.Now().Unix()
	}
	this.lk.RUnlock()
	if itm == nil {
		return 0
	}

	if len(idlst) != len(fnlst) {
		return -1
	}

	for idx, fn := range fnlst {
		if fn(idlst[idx], channel, args...) {
			n++
			if max > 0 && n >= max {
				break
			}
		}
	}
	return n
}

func (this *Subchannel) innerFreeChannel(itm *subchannelItem) bool {
	if itm == nil || len(itm.sesslst) > 0 {
		return false
	}
	delete(this.channellst, itm.id)
	itm.idlst = nil
	itm.sesslst = nil
	itm.fnlst = nil
	itm.idlst = nil
	return true
}

// 释放通道
func (this *Subchannel) ClosePubChannel(channel string) (closed bool) {
	this.lk.Lock()
	defer this.lk.Unlock()

	atomic.AddInt32(&this.lockcnt, 1)
	itm := this.channellst[channel]
	return this.innerFreeChannel(itm)
}

// 清理一些超时10分钟没有发布消息的通道
func (this *Subchannel) CleanChannels() (cnt int) {
	t := time.Now().Unix()
	var lst []*subchannelItem
	this.lk.RLock()
	for _, itm := range this.channellst {
		if t-itm.lastActivityT > 600 { // 10分钟没有发布数据, 进行清理
			lst = append(lst, itm)
		}
	}
	lst = append(lst)
	this.lk.RUnlock()

	if len(lst) > 0 {
		this.lk.Lock()
		defer this.lk.Unlock()
		for i := 0; i < len(lst); i++ {
			if this.innerFreeChannel(lst[i]) {
				cnt++
			}
		}
	}
	return

}
